/*
 * This file Copyright (C) 2007-2014 Mnemosyne LLC
 *
 * It may be used under the GNU GPL versions 2 or 3
 * or any future license endorsed by Mnemosyne LLC.
 *
 */

#include <errno.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>

#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/event.h>

#include "transmission.h"
#include "cache.h"
#include "completion.h"
#include "file.h"
#include "log.h"
#include "peer-io.h"
#include "peer-mgr.h"
#include "peer-msgs.h"
#include "session.h"
#include "torrent.h"
#include "torrent-magnet.h"
#include "tr-assert.h"
#include "tr-dht.h"
#include "utils.h"
#include "variant.h"
#include "version.h"

#ifndef EBADMSG
#define EBADMSG EINVAL
#endif

/**
***
**/

enum
{
    BT_CHOKE = 0,
    BT_UNCHOKE = 1,
    BT_INTERESTED = 2,
    BT_NOT_INTERESTED = 3,
    BT_HAVE = 4,
    BT_BITFIELD = 5,
    BT_REQUEST = 6,
    BT_PIECE = 7,
    BT_CANCEL = 8,
    BT_PORT = 9,
    /* */
    BT_FEXT_SUGGEST = 13,
    BT_FEXT_HAVE_ALL = 14,
    BT_FEXT_HAVE_NONE = 15,
    BT_FEXT_REJECT = 16,
    BT_FEXT_ALLOWED_FAST = 17,
    /* */
    BT_LTEP = 20,
    /* */
    LTEP_HANDSHAKE = 0,
    /* */
    UT_PEX_ID = 1,
    UT_METADATA_ID = 3,
    /* */
    MAX_PEX_PEER_COUNT = 50,
    /* */
    MIN_CHOKE_PERIOD_SEC = 10,
    /* idle seconds before we send a keepalive */
    KEEPALIVE_INTERVAL_SECS = 100,
    /* */
    PEX_INTERVAL_SECS = 90, /* sec between sendPex() calls */
    /* */
    REQQ = 512,
    /* */
    METADATA_REQQ = 64,
    /* */
    MAGIC_NUMBER = 21549,
    /* used in lowering the outMessages queue period */
    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
    HIGH_PRIORITY_INTERVAL_SECS = 2,
    LOW_PRIORITY_INTERVAL_SECS = 10,
    /* number of pieces we'll allow in our fast set */
    MAX_FAST_SET_SIZE = 3,
    /* how many blocks to keep prefetched per peer */
    PREFETCH_SIZE = 18,
    /* when we're making requests from another peer,
       batch them together to send enough requests to
       meet our bandwidth goals for the next N seconds */
    REQUEST_BUF_SECS = 10,
    /* defined in BEP #9 */
    METADATA_MSG_TYPE_REQUEST = 0,
    METADATA_MSG_TYPE_DATA = 1,
    METADATA_MSG_TYPE_REJECT = 2
};

enum
{
    AWAITING_BT_LENGTH,
    AWAITING_BT_ID,
    AWAITING_BT_MESSAGE,
    AWAITING_BT_PIECE
};

typedef enum
{
    ENCRYPTION_PREFERENCE_UNKNOWN,
    ENCRYPTION_PREFERENCE_YES,
    ENCRYPTION_PREFERENCE_NO
}
encryption_preference_t;

/**
***
**/

struct peer_request
{
    uint32_t index;
    uint32_t offset;
    uint32_t length;
};

static void blockToReq(tr_torrent const* tor, tr_block_index_t block, struct peer_request* setme)
{
    tr_torrentGetBlockLocation(tor, block, &setme->index, &setme->offset, &setme->length);
}

/**
***
**/

/* this is raw, unchanged data from the peer regarding
 * the current message that it's sending us. */
struct tr_incoming
{
    uint8_t id;
    uint32_t length; /* includes the +1 for id length */
    struct peer_request blockReq; /* metadata for incoming blocks */
    struct evbuffer* block; /* piece data for incoming blocks */
};

/**
 * Low-level communication state information about a connected peer.
 *
 * This structure remembers the low-level protocol states that we're
 * in with this peer, such as active requests, pex messages, and so on.
 * Its fields are all private to peer-msgs.c.
 *
 * Data not directly involved with sending & receiving messages is
 * stored in tr_peer, where it can be accessed by both peermsgs and
 * the peer manager.
 *
 * @see struct peer_atom
 * @see tr_peer
 */
struct tr_peerMsgs
{
    struct tr_peer peer; /* parent */

    uint16_t magic_number;

    /* Whether or not we've choked this peer. */
    bool peer_is_choked;

    /* whether or not the peer has indicated it will download from us. */
    bool peer_is_interested;

    /* whether or not the peer is choking us. */
    bool client_is_choked;

    /* whether or not we've indicated to the peer that we would download from them if unchoked. */
    bool client_is_interested;

    bool peerSupportsPex;
    bool peerSupportsMetadataXfer;
    bool clientSentLtepHandshake;
    bool peerSentLtepHandshake;

    /*bool haveFastSet;*/

    int desiredRequestCount;

    int prefetchCount;

    int is_active[2];

    /* how long the outMessages batch should be allowed to grow before
     * it's flushed -- some messages (like requests >:) should be sent
     * very quickly; others aren't as urgent. */
    int8_t outMessagesBatchPeriod;

    uint8_t state;
    uint8_t ut_pex_id;
    uint8_t ut_metadata_id;
    uint16_t pexCount;
    uint16_t pexCount6;

    tr_port dht_port;

    encryption_preference_t encryption_preference;

    size_t metadata_size_hint;
#if 0
    size_t fastsetSize;
    tr_piece_index_t fastset[MAX_FAST_SET_SIZE];
#endif

    tr_torrent* torrent;

    tr_peer_callback callback;
    void* callbackData;

    struct evbuffer* outMessages; /* all the non-piece messages */

    struct peer_request peerAskedFor[REQQ];

    int peerAskedForMetadata[METADATA_REQQ];
    int peerAskedForMetadataCount;

    tr_pex* pex;
    tr_pex* pex6;

    /*time_t clientSentPexAt;*/
    time_t clientSentAnythingAt;

    time_t chokeChangedAt;

    /* when we started batching the outMessages */
    time_t outMessagesBatchedAt;

    struct tr_incoming incoming;

    /* if the peer supports the Extension Protocol in BEP 10 and
       supplied a reqq argument, it's stored here. Otherwise, the
       value is zero and should be ignored. */
    int64_t reqq;

    struct event* pexTimer;

    struct tr_peerIo* io;
};

/**
***
**/

static inline tr_session* getSession(struct tr_peerMsgs* msgs)
{
    return msgs->torrent->session;
}

/**
***
**/

static void myDebug(char const* file, int line, struct tr_peerMsgs const* msgs, char const* fmt, ...) TR_GNUC_PRINTF(4, 5);

static void myDebug(char const* file, int line, struct tr_peerMsgs const* msgs, char const* fmt, ...)
{
    tr_sys_file_t const fp = tr_logGetFile();

    if (fp != TR_BAD_SYS_FILE)
    {
        va_list args;
        char timestr[64];
        struct evbuffer* buf = evbuffer_new();
        char* base = tr_sys_path_basename(file, NULL);
        char* message;

        evbuffer_add_printf(buf, "[%s] %s - %s [%s]: ", tr_logGetTimeStr(timestr, sizeof(timestr)), tr_torrentName(
            msgs->torrent), tr_peerIoGetAddrStr(msgs->io), tr_quark_get_string(msgs->peer.client, NULL));
        va_start(args, fmt);
        evbuffer_add_vprintf(buf, fmt, args);
        va_end(args);
        evbuffer_add_printf(buf, " (%s:%d)", base, line);

        message = evbuffer_free_to_str(buf, NULL);
        tr_sys_file_write_line(fp, message, NULL);

        tr_free(base);
        tr_free(message);
    }
}

#define dbgmsg(msgs, ...) \
    do \
    { \
        if (tr_logGetDeepEnabled()) \
        { \
            myDebug(__FILE__, __LINE__, msgs, __VA_ARGS__); \
        } \
    } \
    while (0)

/**
***
**/

static void pokeBatchPeriod(tr_peerMsgs* msgs, int interval)
{
    if (msgs->outMessagesBatchPeriod > interval)
    {
        msgs->outMessagesBatchPeriod = interval;
        dbgmsg(msgs, "lowering batch interval to %d seconds", interval);
    }
}

static void dbgOutMessageLen(tr_peerMsgs* msgs)
{
    dbgmsg(msgs, "outMessage size is now %zu", evbuffer_get_length(msgs->outMessages));
}

static void protocolSendReject(tr_peerMsgs* msgs, struct peer_request const* req)
{
    TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));

    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
    evbuffer_add_uint8(out, BT_FEXT_REJECT);
    evbuffer_add_uint32(out, req->index);
    evbuffer_add_uint32(out, req->offset);
    evbuffer_add_uint32(out, req->length);

    dbgmsg(msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length);
    dbgOutMessageLen(msgs);
}

static void protocolSendRequest(tr_peerMsgs* msgs, struct peer_request const* req)
{
    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
    evbuffer_add_uint8(out, BT_REQUEST);
    evbuffer_add_uint32(out, req->index);
    evbuffer_add_uint32(out, req->offset);
    evbuffer_add_uint32(out, req->length);

    dbgmsg(msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length);
    dbgOutMessageLen(msgs);
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
}

static void protocolSendCancel(tr_peerMsgs* msgs, struct peer_request const* req)
{
    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
    evbuffer_add_uint8(out, BT_CANCEL);
    evbuffer_add_uint32(out, req->index);
    evbuffer_add_uint32(out, req->offset);
    evbuffer_add_uint32(out, req->length);

    dbgmsg(msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length);
    dbgOutMessageLen(msgs);
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
}

static void protocolSendPort(tr_peerMsgs* msgs, uint16_t port)
{
    struct evbuffer* out = msgs->outMessages;

    dbgmsg(msgs, "sending Port %u", port);
    evbuffer_add_uint32(out, 3);
    evbuffer_add_uint8(out, BT_PORT);
    evbuffer_add_uint16(out, port);
}

static void protocolSendHave(tr_peerMsgs* msgs, uint32_t index)
{
    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t) + sizeof(uint32_t));
    evbuffer_add_uint8(out, BT_HAVE);
    evbuffer_add_uint32(out, index);

    dbgmsg(msgs, "sending Have %u", index);
    dbgOutMessageLen(msgs);
    pokeBatchPeriod(msgs, LOW_PRIORITY_INTERVAL_SECS);
}

#if 0

static void protocolSendAllowedFast(tr_peerMsgs* msgs, uint32_t pieceIndex)
{
    TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));

    tr_peerIo* io = msgs->io;
    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(io, out, sizeof(uint8_t) + sizeof(uint32_t));
    evbuffer_add_uint8(io, out, BT_FEXT_ALLOWED_FAST);
    evbuffer_add_uint32(io, out, pieceIndex);

    dbgmsg(msgs, "sending Allowed Fast %u...", pieceIndex);
    dbgOutMessageLen(msgs);
}

#endif

static void protocolSendChoke(tr_peerMsgs* msgs, int choke)
{
    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t));
    evbuffer_add_uint8(out, choke ? BT_CHOKE : BT_UNCHOKE);

    dbgmsg(msgs, "sending %s...", choke ? "Choke" : "Unchoke");
    dbgOutMessageLen(msgs);
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
}

static void protocolSendHaveAll(tr_peerMsgs* msgs)
{
    TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));

    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t));
    evbuffer_add_uint8(out, BT_FEXT_HAVE_ALL);

    dbgmsg(msgs, "sending HAVE_ALL...");
    dbgOutMessageLen(msgs);
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
}

static void protocolSendHaveNone(tr_peerMsgs* msgs)
{
    TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));

    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t));
    evbuffer_add_uint8(out, BT_FEXT_HAVE_NONE);

    dbgmsg(msgs, "sending HAVE_NONE...");
    dbgOutMessageLen(msgs);
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
}

/**
***  EVENTS
**/

static void publish(tr_peerMsgs* msgs, tr_peer_event* e)
{
    if (msgs->callback != NULL)
    {
        (*msgs->callback)(&msgs->peer, e, msgs->callbackData);
    }
}

static void fireError(tr_peerMsgs* msgs, int err)
{
    tr_peer_event e = TR_PEER_EVENT_INIT;
    e.eventType = TR_PEER_ERROR;
    e.err = err;
    publish(msgs, &e);
}

static void fireGotBlock(tr_peerMsgs* msgs, struct peer_request const* req)
{
    tr_peer_event e = TR_PEER_EVENT_INIT;
    e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
    e.pieceIndex = req->index;
    e.offset = req->offset;
    e.length = req->length;
    publish(msgs, &e);
}

static void fireGotRej(tr_peerMsgs* msgs, struct peer_request const* req)
{
    tr_peer_event e = TR_PEER_EVENT_INIT;
    e.eventType = TR_PEER_CLIENT_GOT_REJ;
    e.pieceIndex = req->index;
    e.offset = req->offset;
    e.length = req->length;
    publish(msgs, &e);
}

static void fireGotChoke(tr_peerMsgs* msgs)
{
    tr_peer_event e = TR_PEER_EVENT_INIT;
    e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
    publish(msgs, &e);
}

static void fireClientGotHaveAll(tr_peerMsgs* msgs)
{
    tr_peer_event e = TR_PEER_EVENT_INIT;
    e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
    publish(msgs, &e);
}

static void fireClientGotHaveNone(tr_peerMsgs* msgs)
{
    tr_peer_event e = TR_PEER_EVENT_INIT;
    e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
    publish(msgs, &e);
}

static void fireClientGotPieceData(tr_peerMsgs* msgs, uint32_t length)
{
    tr_peer_event e = TR_PEER_EVENT_INIT;
    e.length = length;
    e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
    publish(msgs, &e);
}

static void firePeerGotPieceData(tr_peerMsgs* msgs, uint32_t length)
{
    tr_peer_event e = TR_PEER_EVENT_INIT;
    e.length = length;
    e.eventType = TR_PEER_PEER_GOT_PIECE_DATA;
    publish(msgs, &e);
}

static void fireClientGotSuggest(tr_peerMsgs* msgs, uint32_t pieceIndex)
{
    tr_peer_event e = TR_PEER_EVENT_INIT;
    e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
    e.pieceIndex = pieceIndex;
    publish(msgs, &e);
}

static void fireClientGotPort(tr_peerMsgs* msgs, tr_port port)
{
    tr_peer_event e = TR_PEER_EVENT_INIT;
    e.eventType = TR_PEER_CLIENT_GOT_PORT;
    e.port = port;
    publish(msgs, &e);
}

static void fireClientGotAllowedFast(tr_peerMsgs* msgs, uint32_t pieceIndex)
{
    tr_peer_event e = TR_PEER_EVENT_INIT;
    e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
    e.pieceIndex = pieceIndex;
    publish(msgs, &e);
}

static void fireClientGotBitfield(tr_peerMsgs* msgs, tr_bitfield* bitfield)
{
    tr_peer_event e = TR_PEER_EVENT_INIT;
    e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
    e.bitfield = bitfield;
    publish(msgs, &e);
}

static void fireClientGotHave(tr_peerMsgs* msgs, tr_piece_index_t index)
{
    tr_peer_event e = TR_PEER_EVENT_INIT;
    e.eventType = TR_PEER_CLIENT_GOT_HAVE;
    e.pieceIndex = index;
    publish(msgs, &e);
}

/**
***  ALLOWED FAST SET
***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
**/

#if 0

size_t tr_generateAllowedSet(tr_piece_index_t* setmePieces, size_t desiredSetSize, size_t pieceCount, uint8_t const* infohash,
    tr_address const* addr)
{
    TR_ASSERT(setmePieces != NULL);
    TR_ASSERT(desiredSetSize <= pieceCount);
    TR_ASSERT(desiredSetSize != 0);
    TR_ASSERT(pieceCount != 0);
    TR_ASSERT(infohash != NULL);
    TR_ASSERT(addr != NULL);

    size_t setSize = 0;

    if (addr->type == TR_AF_INET)
    {
        uint8_t w[SHA_DIGEST_LENGTH + 4];
        uint8_t* walk = w;
        uint8_t x[SHA_DIGEST_LENGTH];

        uint32_t ui32 = ntohl(htonl(addr->addr.addr4.s_addr) & 0xffffff00); /* (1) */
        memcpy(w, &ui32, sizeof(uint32_t));
        walk += sizeof(uint32_t);
        memcpy(walk, infohash, SHA_DIGEST_LENGTH); /* (2) */
        walk += SHA_DIGEST_LENGTH;
        tr_sha1(x, w, walk - w, NULL); /* (3) */
        TR_ASSERT(sizeof(w) == walk - w);

        while (setSize < desiredSetSize)
        {
            for (int i = 0; i < 5 && setSize < desiredSetSize; ++i) /* (4) */
            {
                uint32_t j = i * 4; /* (5) */
                uint32_t y = ntohl(*(uint32_t*)(x + j)); /* (6) */
                uint32_t index = y % pieceCount; /* (7) */
                bool found = false;

                for (size_t k = 0; !found && k < setSize; ++k) /* (8) */
                {
                    found = setmePieces[k] == index;
                }

                if (!found)
                {
                    setmePieces[setSize++] = index; /* (9) */
                }
            }

            tr_sha1(x, x, sizeof(x), NULL); /* (3) */
        }
    }

    return setSize;
}

static void updateFastSet(tr_peerMsgs* msgs UNUSED)
{
    bool const fext = tr_peerIoSupportsFEXT(msgs->io);
    bool const peerIsNeedy = msgs->peer->progress < 0.10;

    if (fext && peerIsNeedy && !msgs->haveFastSet)
    {
        struct tr_address const* addr = tr_peerIoGetAddress(msgs->io, NULL);
        tr_info const* inf = &msgs->torrent->info;
        size_t const numwant = MIN(MAX_FAST_SET_SIZE, inf->pieceCount);

        /* build the fast set */
        msgs->fastsetSize = tr_generateAllowedSet(msgs->fastset, numwant, inf->pieceCount, inf->hash, addr);
        msgs->haveFastSet = true;

        /* send it to the peer */
        for (size_t i = 0; i < msgs->fastsetSize; ++i)
        {
            protocolSendAllowedFast(msgs, msgs->fastset[i]);
        }
    }
}

#endif

/***
****  ACTIVE
***/

static bool tr_peerMsgsCalculateActive(tr_peerMsgs const* msgs, tr_direction direction)
{
    TR_ASSERT(tr_isPeerMsgs(msgs));
    TR_ASSERT(tr_isDirection(direction));

    bool is_active;

    if (direction == TR_CLIENT_TO_PEER)
    {
        is_active = tr_peerMsgsIsPeerInterested(msgs) && !tr_peerMsgsIsPeerChoked(msgs);

        /* FIXME: https://trac.transmissionbt.com/ticket/5505
        if (is_active)
        {
            TR_ASSERT(!tr_peerIsSeed(&msgs->peer));
        }
        */
    }
    else /* TR_PEER_TO_CLIENT */
    {
        if (!tr_torrentHasMetadata(msgs->torrent))
        {
            is_active = true;
        }
        else
        {
            is_active = tr_peerMsgsIsClientInterested(msgs) && !tr_peerMsgsIsClientChoked(msgs);

            if (is_active)
            {
                TR_ASSERT(!tr_torrentIsSeed(msgs->torrent));
            }
        }
    }

    return is_active;
}

bool tr_peerMsgsIsActive(tr_peerMsgs const* msgs, tr_direction direction)
{
    TR_ASSERT(tr_isPeerMsgs(msgs));
    TR_ASSERT(tr_isDirection(direction));

    bool is_active = msgs->is_active[direction];

    TR_ASSERT(is_active == tr_peerMsgsCalculateActive(msgs, direction));

    return is_active;
}

static void tr_peerMsgsSetActive(tr_peerMsgs* msgs, tr_direction direction, bool is_active)
{
    dbgmsg(msgs, "direction [%d] is_active [%d]", (int)direction, (int)is_active);

    if (msgs->is_active[direction] != is_active)
    {
        msgs->is_active[direction] = is_active;

        tr_swarmIncrementActivePeers(msgs->torrent->swarm, direction, is_active);
    }
}

void tr_peerMsgsUpdateActive(tr_peerMsgs* msgs, tr_direction direction)
{
    bool const is_active = tr_peerMsgsCalculateActive(msgs, direction);

    tr_peerMsgsSetActive(msgs, direction, is_active);
}

/**
***  INTEREST
**/

static void sendInterest(tr_peerMsgs* msgs, bool b)
{
    TR_ASSERT(msgs != NULL);

    struct evbuffer* out = msgs->outMessages;

    msgs->client_is_interested = b;
    dbgmsg(msgs, "Sending %s", b ? "Interested" : "Not Interested");
    evbuffer_add_uint32(out, sizeof(uint8_t));
    evbuffer_add_uint8(out, b ? BT_INTERESTED : BT_NOT_INTERESTED);

    pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
    dbgOutMessageLen(msgs);
}

static void updateInterest(tr_peerMsgs* msgs UNUSED)
{
    /* FIXME -- might need to poke the mgr on startup */
}

void tr_peerMsgsSetInterested(tr_peerMsgs* msgs, bool b)
{
    if (msgs->client_is_interested != b)
    {
        sendInterest(msgs, b);

        tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT);
    }
}

static bool popNextMetadataRequest(tr_peerMsgs* msgs, int* piece)
{
    if (msgs->peerAskedForMetadataCount == 0)
    {
        return false;
    }

    *piece = msgs->peerAskedForMetadata[0];

    tr_removeElementFromArray(msgs->peerAskedForMetadata, 0, sizeof(int), msgs->peerAskedForMetadataCount);
    --msgs->peerAskedForMetadataCount;

    return true;
}

static bool popNextRequest(tr_peerMsgs* msgs, struct peer_request* setme)
{
    if (msgs->peer.pendingReqsToClient == 0)
    {
        return false;
    }

    *setme = msgs->peerAskedFor[0];

    tr_removeElementFromArray(msgs->peerAskedFor, 0, sizeof(struct peer_request), msgs->peer.pendingReqsToClient);
    --msgs->peer.pendingReqsToClient;

    return true;
}

static void cancelAllRequestsToClient(tr_peerMsgs* msgs)
{
    struct peer_request req;
    bool const mustSendCancel = tr_peerIoSupportsFEXT(msgs->io);

    while (popNextRequest(msgs, &req))
    {
        if (mustSendCancel)
        {
            protocolSendReject(msgs, &req);
        }
    }
}

void tr_peerMsgsSetChoke(tr_peerMsgs* msgs, bool peer_is_choked)
{
    TR_ASSERT(msgs != NULL);

    time_t const now = tr_time();
    time_t const fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;

    if (msgs->chokeChangedAt > fibrillationTime)
    {
        dbgmsg(msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked);
    }
    else if (msgs->peer_is_choked != peer_is_choked)
    {
        msgs->peer_is_choked = peer_is_choked;

        if (peer_is_choked)
        {
            cancelAllRequestsToClient(msgs);
        }

        protocolSendChoke(msgs, peer_is_choked);
        msgs->chokeChangedAt = now;
        tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER);
    }
}

/**
***
**/

void tr_peerMsgsHave(tr_peerMsgs* msgs, uint32_t index)
{
    protocolSendHave(msgs, index);

    /* since we have more pieces now, we might not be interested in this peer */
    updateInterest(msgs);
}

/**
***
**/

static bool reqIsValid(tr_peerMsgs const* peer, uint32_t index, uint32_t offset, uint32_t length)
{
    return tr_torrentReqIsValid(peer->torrent, index, offset, length);
}

static bool requestIsValid(tr_peerMsgs const* msgs, struct peer_request const* req)
{
    return reqIsValid(msgs, req->index, req->offset, req->length);
}

void tr_peerMsgsCancel(tr_peerMsgs* msgs, tr_block_index_t block)
{
    struct peer_request req;
    // fprintf(stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n",
    //     (size_t)block, msgs->peer);
    blockToReq(msgs->torrent, block, &req);
    protocolSendCancel(msgs, &req);
}

/**
***
**/

static void sendLtepHandshake(tr_peerMsgs* msgs)
{
    tr_variant val;
    bool allow_pex;
    bool allow_metadata_xfer;
    struct evbuffer* payload;
    struct evbuffer* out = msgs->outMessages;
    unsigned char const* ipv6 = tr_globalIPv6();
    static tr_quark version_quark = 0;

    if (msgs->clientSentLtepHandshake)
    {
        return;
    }

    if (version_quark == 0)
    {
        version_quark = tr_quark_new(TR_NAME " " USERAGENT_PREFIX, TR_BAD_SIZE);
    }

    dbgmsg(msgs, "sending an ltep handshake");
    msgs->clientSentLtepHandshake = true;

    /* decide if we want to advertise metadata xfer support (BEP 9) */
    if (tr_torrentIsPrivate(msgs->torrent))
    {
        allow_metadata_xfer = false;
    }
    else
    {
        allow_metadata_xfer = true;
    }

    /* decide if we want to advertise pex support */
    if (!tr_torrentAllowsPex(msgs->torrent))
    {
        allow_pex = false;
    }
    else if (msgs->peerSentLtepHandshake)
    {
        allow_pex = msgs->peerSupportsPex;
    }
    else
    {
        allow_pex = true;
    }

    tr_variantInitDict(&val, 8);
    tr_variantDictAddInt(&val, TR_KEY_e, getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED);

    if (ipv6 != NULL)
    {
        tr_variantDictAddRaw(&val, TR_KEY_ipv6, ipv6, 16);
    }

    if (allow_metadata_xfer && tr_torrentHasMetadata(msgs->torrent) && msgs->torrent->infoDictLength > 0)
    {
        tr_variantDictAddInt(&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength);
    }

    tr_variantDictAddInt(&val, TR_KEY_p, tr_sessionGetPublicPeerPort(getSession(msgs)));
    tr_variantDictAddInt(&val, TR_KEY_reqq, REQQ);
    tr_variantDictAddInt(&val, TR_KEY_upload_only, tr_torrentIsSeed(msgs->torrent));
    tr_variantDictAddQuark(&val, TR_KEY_v, version_quark);

    if (allow_metadata_xfer || allow_pex)
    {
        tr_variant* m = tr_variantDictAddDict(&val, TR_KEY_m, 2);

        if (allow_metadata_xfer)
        {
            tr_variantDictAddInt(m, TR_KEY_ut_metadata, UT_METADATA_ID);
        }

        if (allow_pex)
        {
            tr_variantDictAddInt(m, TR_KEY_ut_pex, UT_PEX_ID);
        }
    }

    payload = tr_variantToBuf(&val, TR_VARIANT_FMT_BENC);

    evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
    evbuffer_add_uint8(out, BT_LTEP);
    evbuffer_add_uint8(out, LTEP_HANDSHAKE);
    evbuffer_add_buffer(out, payload);
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
    dbgOutMessageLen(msgs);

    /* cleanup */
    evbuffer_free(payload);
    tr_variantFree(&val);
}

static void parseLtepHandshake(tr_peerMsgs* msgs, uint32_t len, struct evbuffer* inbuf)
{
    int64_t i;
    tr_variant val;
    tr_variant* sub;
    uint8_t* tmp = tr_new(uint8_t, len);
    uint8_t const* addr;
    size_t addr_len;
    tr_pex pex;
    int8_t seedProbability = -1;

    memset(&pex, 0, sizeof(tr_pex));

    tr_peerIoReadBytes(msgs->io, inbuf, tmp, len);
    msgs->peerSentLtepHandshake = true;

    if (tr_variantFromBenc(&val, tmp, len) != 0 || !tr_variantIsDict(&val))
    {
        dbgmsg(msgs, "GET  extended-handshake, couldn't get dictionary");
        tr_free(tmp);
        return;
    }

    /* arbitrary limit, should be more than enough */
    if (len <= 4096)
    {
        dbgmsg(msgs, "here is the handshake: [%*.*s]", (int)len, (int)len, tmp);
    }
    else
    {
        dbgmsg(msgs, "handshake length is too big (%" PRIu32 "), printing skipped", len);
    }

    /* does the peer prefer encrypted connections? */
    if (tr_variantDictFindInt(&val, TR_KEY_e, &i))
    {
        msgs->encryption_preference = i != 0 ? ENCRYPTION_PREFERENCE_YES : ENCRYPTION_PREFERENCE_NO;

        if (i != 0)
        {
            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
        }
    }

    /* check supported messages for utorrent pex */
    msgs->peerSupportsPex = false;
    msgs->peerSupportsMetadataXfer = false;

    if (tr_variantDictFindDict(&val, TR_KEY_m, &sub))
    {
        if (tr_variantDictFindInt(sub, TR_KEY_ut_pex, &i))
        {
            msgs->peerSupportsPex = i != 0;
            msgs->ut_pex_id = (uint8_t)i;
            dbgmsg(msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id);
        }

        if (tr_variantDictFindInt(sub, TR_KEY_ut_metadata, &i))
        {
            msgs->peerSupportsMetadataXfer = i != 0;
            msgs->ut_metadata_id = (uint8_t)i;
            dbgmsg(msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id);
        }

        if (tr_variantDictFindInt(sub, TR_KEY_ut_holepunch, &i))
        {
            /* Mysterious µTorrent extension that we don't grok.  However,
               it implies support for µTP, so use it to indicate that. */
            tr_peerMgrSetUtpFailed(msgs->torrent, tr_peerIoGetAddress(msgs->io, NULL), false);
        }
    }

    /* look for metainfo size (BEP 9) */
    if (tr_variantDictFindInt(&val, TR_KEY_metadata_size, &i))
    {
        if (tr_torrentSetMetadataSizeHint(msgs->torrent, i))
        {
            msgs->metadata_size_hint = (size_t)i;
        }
    }

    /* look for upload_only (BEP 21) */
    if (tr_variantDictFindInt(&val, TR_KEY_upload_only, &i))
    {
        seedProbability = i == 0 ? 0 : 100;
    }

    /* get peer's listening port */
    if (tr_variantDictFindInt(&val, TR_KEY_p, &i))
    {
        pex.port = htons((uint16_t)i);
        fireClientGotPort(msgs, pex.port);
        dbgmsg(msgs, "peer's port is now %d", (int)i);
    }

    if (tr_peerIoIsIncoming(msgs->io) && tr_variantDictFindRaw(&val, TR_KEY_ipv4, &addr, &addr_len) && addr_len == 4)
    {
        pex.addr.type = TR_AF_INET;
        memcpy(&pex.addr.addr.addr4, addr, 4);
        tr_peerMgrAddPex(msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
    }

    if (tr_peerIoIsIncoming(msgs->io) && tr_variantDictFindRaw(&val, TR_KEY_ipv6, &addr, &addr_len) && addr_len == 16)
    {
        pex.addr.type = TR_AF_INET6;
        memcpy(&pex.addr.addr.addr6, addr, 16);
        tr_peerMgrAddPex(msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
    }

    /* get peer's maximum request queue size */
    if (tr_variantDictFindInt(&val, TR_KEY_reqq, &i))
    {
        msgs->reqq = i;
    }

    tr_variantFree(&val);
    tr_free(tmp);
}

static void parseUtMetadata(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf)
{
    tr_variant dict;
    char* msg_end;
    char const* benc_end;
    int64_t msg_type = -1;
    int64_t piece = -1;
    int64_t total_size = 0;
    uint8_t* tmp = tr_new(uint8_t, msglen);

    tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen);
    msg_end = (char*)tmp + msglen;

    if (tr_variantFromBencFull(&dict, tmp, msglen, NULL, &benc_end) == 0)
    {
        (void)tr_variantDictFindInt(&dict, TR_KEY_msg_type, &msg_type);
        (void)tr_variantDictFindInt(&dict, TR_KEY_piece, &piece);
        (void)tr_variantDictFindInt(&dict, TR_KEY_total_size, &total_size);
        tr_variantFree(&dict);
    }

    dbgmsg(msgs, "got ut_metadata msg: type %d, piece %d, total_size %d", (int)msg_type, (int)piece, (int)total_size);

    if (msg_type == METADATA_MSG_TYPE_REJECT)
    {
        /* NOOP */
    }

    if (msg_type == METADATA_MSG_TYPE_DATA && !tr_torrentHasMetadata(msgs->torrent) &&
        msg_end - benc_end <= METADATA_PIECE_SIZE && piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size)
    {
        int const pieceLen = msg_end - benc_end;
        tr_torrentSetMetadataPiece(msgs->torrent, piece, benc_end, pieceLen);
    }

    if (msg_type == METADATA_MSG_TYPE_REQUEST)
    {
        if (piece >= 0 && tr_torrentHasMetadata(msgs->torrent) && !tr_torrentIsPrivate(msgs->torrent) &&
            msgs->peerAskedForMetadataCount < METADATA_REQQ)
        {
            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
        }
        else
        {
            tr_variant tmp;
            struct evbuffer* payload;
            struct evbuffer* out = msgs->outMessages;

            /* build the rejection message */
            tr_variantInitDict(&tmp, 2);
            tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
            tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
            payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);

            /* write it out as a LTEP message to our outMessages buffer */
            evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
            evbuffer_add_uint8(out, BT_LTEP);
            evbuffer_add_uint8(out, msgs->ut_metadata_id);
            evbuffer_add_buffer(out, payload);
            pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
            dbgOutMessageLen(msgs);

            /* cleanup */
            evbuffer_free(payload);
            tr_variantFree(&tmp);
        }
    }

    tr_free(tmp);
}

static void parseUtPex(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf)
{
    tr_torrent* tor = msgs->torrent;
    if (!tr_torrentAllowsPex(tor))
    {
        return;
    }

    uint8_t* tmp = tr_new(uint8_t, msglen);
    tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen);

    tr_variant val;
    bool const loaded = tr_variantFromBenc(&val, tmp, msglen) == 0;

    tr_free(tmp);

    if (!loaded)
    {
        return;
    }

    uint8_t const* added;
    size_t added_len;

    if (tr_variantDictFindRaw(&val, TR_KEY_added, &added, &added_len))
    {
        tr_pex* pex;
        size_t n;
        size_t added_f_len;
        uint8_t const* added_f;

        if (!tr_variantDictFindRaw(&val, TR_KEY_added_f, &added_f, &added_f_len))
        {
            added_f_len = 0;
            added_f = NULL;
        }

        pex = tr_peerMgrCompactToPex(added, added_len, added_f, added_f_len, &n);

        n = MIN(n, MAX_PEX_PEER_COUNT);

        for (size_t i = 0; i < n; ++i)
        {
            int seedProbability = -1;

            if (i < added_f_len)
            {
                seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) != 0 ? 100 : 0;
            }

            tr_peerMgrAddPex(tor, TR_PEER_FROM_PEX, pex + i, seedProbability);
        }

        tr_free(pex);
    }

    if (tr_variantDictFindRaw(&val, TR_KEY_added6, &added, &added_len))
    {
        tr_pex* pex;
        size_t n;
        size_t added_f_len;
        uint8_t const* added_f;

        if (!tr_variantDictFindRaw(&val, TR_KEY_added6_f, &added_f, &added_f_len))
        {
            added_f_len = 0;
            added_f = NULL;
        }

        pex = tr_peerMgrCompact6ToPex(added, added_len, added_f, added_f_len, &n);

        n = MIN(n, MAX_PEX_PEER_COUNT);

        for (size_t i = 0; i < n; ++i)
        {
            int seedProbability = -1;

            if (i < added_f_len)
            {
                seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) != 0 ? 100 : 0;
            }

            tr_peerMgrAddPex(tor, TR_PEER_FROM_PEX, pex + i, seedProbability);
        }

        tr_free(pex);
    }

    tr_variantFree(&val);
}

static void sendPex(tr_peerMsgs* msgs);

static void parseLtep(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf)
{
    TR_ASSERT(msglen > 0);

    uint8_t ltep_msgid;

    tr_peerIoReadUint8(msgs->io, inbuf, &ltep_msgid);
    msglen--;

    if (ltep_msgid == LTEP_HANDSHAKE)
    {
        dbgmsg(msgs, "got ltep handshake");
        parseLtepHandshake(msgs, msglen, inbuf);

        if (tr_peerIoSupportsLTEP(msgs->io))
        {
            sendLtepHandshake(msgs);
            sendPex(msgs);
        }
    }
    else if (ltep_msgid == UT_PEX_ID)
    {
        dbgmsg(msgs, "got ut pex");
        msgs->peerSupportsPex = true;
        parseUtPex(msgs, msglen, inbuf);
    }
    else if (ltep_msgid == UT_METADATA_ID)
    {
        dbgmsg(msgs, "got ut metadata");
        msgs->peerSupportsMetadataXfer = true;
        parseUtMetadata(msgs, msglen, inbuf);
    }
    else
    {
        dbgmsg(msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid);
        evbuffer_drain(inbuf, msglen);
    }
}

static int readBtLength(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen)
{
    uint32_t len;

    if (inlen < sizeof(len))
    {
        return READ_LATER;
    }

    tr_peerIoReadUint32(msgs->io, inbuf, &len);

    if (len == 0) /* peer sent us a keepalive message */
    {
        dbgmsg(msgs, "got KeepAlive");
    }
    else
    {
        msgs->incoming.length = len;
        msgs->state = AWAITING_BT_ID;
    }

    return READ_NOW;
}

static int readBtMessage(tr_peerMsgs*, struct evbuffer*, size_t);

static int readBtId(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen)
{
    uint8_t id;

    if (inlen < sizeof(uint8_t))
    {
        return READ_LATER;
    }

    tr_peerIoReadUint8(msgs->io, inbuf, &id);
    msgs->incoming.id = id;
    dbgmsg(msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length);

    if (id == BT_PIECE)
    {
        msgs->state = AWAITING_BT_PIECE;
        return READ_NOW;
    }
    else if (msgs->incoming.length != 1)
    {
        msgs->state = AWAITING_BT_MESSAGE;
        return READ_NOW;
    }
    else
    {
        return readBtMessage(msgs, inbuf, inlen - 1);
    }
}

static void updatePeerProgress(tr_peerMsgs* msgs)
{
    tr_peerUpdateProgress(msgs->torrent, &msgs->peer);

    /* updateFastSet(msgs); */
    updateInterest(msgs);
}

static void prefetchPieces(tr_peerMsgs* msgs)
{
    if (!getSession(msgs)->isPrefetchEnabled)
    {
        return;
    }

    for (int i = msgs->prefetchCount; i < msgs->peer.pendingReqsToClient && i < PREFETCH_SIZE; ++i)
    {
        struct peer_request const* req = msgs->peerAskedFor + i;

        if (requestIsValid(msgs, req))
        {
            tr_cachePrefetchBlock(getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length);
            ++msgs->prefetchCount;
        }
    }
}

static void peerMadeRequest(tr_peerMsgs* msgs, struct peer_request const* req)
{
    bool const fext = tr_peerIoSupportsFEXT(msgs->io);
    bool const reqIsValid = requestIsValid(msgs, req);
    bool const clientHasPiece = reqIsValid && tr_torrentPieceIsComplete(msgs->torrent, req->index);
    bool const peerIsChoked = msgs->peer_is_choked;

    bool allow = false;

    if (!reqIsValid)
    {
        dbgmsg(msgs, "rejecting an invalid request.");
    }
    else if (!clientHasPiece)
    {
        dbgmsg(msgs, "rejecting request for a piece we don't have.");
    }
    else if (peerIsChoked)
    {
        dbgmsg(msgs, "rejecting request from choked peer");
    }
    else if (msgs->peer.pendingReqsToClient + 1 >= REQQ)
    {
        dbgmsg(msgs, "rejecting request ... reqq is full");
    }
    else
    {
        allow = true;
    }

    if (allow)
    {
        msgs->peerAskedFor[msgs->peer.pendingReqsToClient++] = *req;
        prefetchPieces(msgs);
    }
    else if (fext)
    {
        protocolSendReject(msgs, req);
    }
}

static bool messageLengthIsCorrect(tr_peerMsgs const* msg, uint8_t id, uint32_t len)
{
    switch (id)
    {
    case BT_CHOKE:
    case BT_UNCHOKE:
    case BT_INTERESTED:
    case BT_NOT_INTERESTED:
    case BT_FEXT_HAVE_ALL:
    case BT_FEXT_HAVE_NONE:
        return len == 1;

    case BT_HAVE:
    case BT_FEXT_SUGGEST:
    case BT_FEXT_ALLOWED_FAST:
        return len == 5;

    case BT_BITFIELD:
        if (tr_torrentHasMetadata(msg->torrent))
        {
            return len == (msg->torrent->info.pieceCount >> 3) + ((msg->torrent->info.pieceCount & 7) != 0 ? 1 : 0) + 1U;
        }

        /* we don't know the piece count yet,
           so we can only guess whether to send true or false */
        if (msg->metadata_size_hint > 0)
        {
            return len <= msg->metadata_size_hint;
        }

        return true;

    case BT_REQUEST:
    case BT_CANCEL:
    case BT_FEXT_REJECT:
        return len == 13;

    case BT_PIECE:
        return len > 9 && len <= 16393;

    case BT_PORT:
        return len == 3;

    case BT_LTEP:
        return len >= 2;

    default:
        return false;
    }
}

static int clientGotBlock(tr_peerMsgs* msgs, struct evbuffer* block, struct peer_request const* req);

static int readBtPiece(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen, size_t* setme_piece_bytes_read)
{
    TR_ASSERT(evbuffer_get_length(inbuf) >= inlen);

    dbgmsg(msgs, "In readBtPiece");

    struct peer_request* req = &msgs->incoming.blockReq;

    if (req->length == 0)
    {
        if (inlen < 8)
        {
            return READ_LATER;
        }

        tr_peerIoReadUint32(msgs->io, inbuf, &req->index);
        tr_peerIoReadUint32(msgs->io, inbuf, &req->offset);
        req->length = msgs->incoming.length - 9;
        dbgmsg(msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length);
        return READ_NOW;
    }
    else
    {
        int err;
        size_t n;
        size_t nLeft;
        struct evbuffer* block_buffer;

        if (msgs->incoming.block == NULL)
        {
            msgs->incoming.block = evbuffer_new();
        }

        block_buffer = msgs->incoming.block;

        /* read in another chunk of data */
        nLeft = req->length - evbuffer_get_length(block_buffer);
        n = MIN(nLeft, inlen);

        tr_peerIoReadBytesToBuf(msgs->io, inbuf, block_buffer, n);

        fireClientGotPieceData(msgs, n);
        *setme_piece_bytes_read += n;
        dbgmsg(msgs, "got %zu bytes for block %u:%u->%u ... %d remain", n, req->index, req->offset, req->length,
            (int)(req->length - evbuffer_get_length(block_buffer)));

        if (evbuffer_get_length(block_buffer) < req->length)
        {
            return READ_LATER;
        }

        /* pass the block along... */
        err = clientGotBlock(msgs, block_buffer, req);
        evbuffer_drain(block_buffer, evbuffer_get_length(block_buffer));

        /* cleanup */
        req->length = 0;
        msgs->state = AWAITING_BT_LENGTH;
        return err != 0 ? READ_ERR : READ_NOW;
    }
}

static void updateDesiredRequestCount(tr_peerMsgs* msgs);

static int readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen)
{
    uint8_t const id = msgs->incoming.id;
#ifdef TR_ENABLE_ASSERTS
    size_t const startBufLen = evbuffer_get_length(inbuf);
#endif
    bool const fext = tr_peerIoSupportsFEXT(msgs->io);

    uint32_t ui32;
    uint32_t msglen = msgs->incoming.length;

    TR_ASSERT(msglen > 0);

    --msglen; /* id length */

    dbgmsg(msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen);

    if (inlen < msglen)
    {
        return READ_LATER;
    }

    if (!messageLengthIsCorrect(msgs, id, msglen + 1))
    {
        dbgmsg(msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen);
        fireError(msgs, EMSGSIZE);
        return READ_ERR;
    }

    switch (id)
    {
    case BT_CHOKE:
        dbgmsg(msgs, "got Choke");
        msgs->client_is_choked = true;

        if (!fext)
        {
            fireGotChoke(msgs);
        }

        tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT);
        break;

    case BT_UNCHOKE:
        dbgmsg(msgs, "got Unchoke");
        msgs->client_is_choked = false;
        tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT);
        updateDesiredRequestCount(msgs);
        break;

    case BT_INTERESTED:
        dbgmsg(msgs, "got Interested");
        msgs->peer_is_interested = true;
        tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER);
        break;

    case BT_NOT_INTERESTED:
        dbgmsg(msgs, "got Not Interested");
        msgs->peer_is_interested = false;
        tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER);
        break;

    case BT_HAVE:
        tr_peerIoReadUint32(msgs->io, inbuf, &ui32);
        dbgmsg(msgs, "got Have: %u", ui32);

        if (tr_torrentHasMetadata(msgs->torrent) && ui32 >= msgs->torrent->info.pieceCount)
        {
            fireError(msgs, ERANGE);
            return READ_ERR;
        }

        /* a peer can send the same HAVE message twice... */
        if (!tr_bitfieldHas(&msgs->peer.have, ui32))
        {
            tr_bitfieldAdd(&msgs->peer.have, ui32);
            fireClientGotHave(msgs, ui32);
        }

        updatePeerProgress(msgs);
        break;

    case BT_BITFIELD:
        {
            uint8_t* tmp = tr_new(uint8_t, msglen);
            dbgmsg(msgs, "got a bitfield");
            tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen);
            tr_bitfieldSetRaw(&msgs->peer.have, tmp, msglen, tr_torrentHasMetadata(msgs->torrent));
            fireClientGotBitfield(msgs, &msgs->peer.have);
            updatePeerProgress(msgs);
            tr_free(tmp);
            break;
        }

    case BT_REQUEST:
        {
            struct peer_request r;
            tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
            tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
            tr_peerIoReadUint32(msgs->io, inbuf, &r.length);
            dbgmsg(msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length);
            peerMadeRequest(msgs, &r);
            break;
        }

    case BT_CANCEL:
        {
            struct peer_request r;
            tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
            tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
            tr_peerIoReadUint32(msgs->io, inbuf, &r.length);
            tr_historyAdd(&msgs->peer.cancelsSentToClient, tr_time(), 1);
            dbgmsg(msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length);

            for (int i = 0; i < msgs->peer.pendingReqsToClient; ++i)
            {
                struct peer_request const* req = msgs->peerAskedFor + i;

                if (req->index == r.index && req->offset == r.offset && req->length == r.length)
                {
                    tr_removeElementFromArray(msgs->peerAskedFor, i, sizeof(struct peer_request),
                        msgs->peer.pendingReqsToClient);
                    --msgs->peer.pendingReqsToClient;
                    break;
                }
            }

            break;
        }

    case BT_PIECE:
        TR_ASSERT(false); /* handled elsewhere! */
        break;

    case BT_PORT:
        dbgmsg(msgs, "Got a BT_PORT");
        tr_peerIoReadUint16(msgs->io, inbuf, &msgs->dht_port);

        if (msgs->dht_port > 0)
        {
            tr_dhtAddNode(getSession(msgs), tr_peerAddress(&msgs->peer), msgs->dht_port, 0);
        }

        break;

    case BT_FEXT_SUGGEST:
        dbgmsg(msgs, "Got a BT_FEXT_SUGGEST");
        tr_peerIoReadUint32(msgs->io, inbuf, &ui32);

        if (fext)
        {
            fireClientGotSuggest(msgs, ui32);
        }
        else
        {
            fireError(msgs, EMSGSIZE);
            return READ_ERR;
        }

        break;

    case BT_FEXT_ALLOWED_FAST:
        dbgmsg(msgs, "Got a BT_FEXT_ALLOWED_FAST");
        tr_peerIoReadUint32(msgs->io, inbuf, &ui32);

        if (fext)
        {
            fireClientGotAllowedFast(msgs, ui32);
        }
        else
        {
            fireError(msgs, EMSGSIZE);
            return READ_ERR;
        }

        break;

    case BT_FEXT_HAVE_ALL:
        dbgmsg(msgs, "Got a BT_FEXT_HAVE_ALL");

        if (fext)
        {
            tr_bitfieldSetHasAll(&msgs->peer.have);
            TR_ASSERT(tr_bitfieldHasAll(&msgs->peer.have));
            fireClientGotHaveAll(msgs);
            updatePeerProgress(msgs);
        }
        else
        {
            fireError(msgs, EMSGSIZE);
            return READ_ERR;
        }

        break;

    case BT_FEXT_HAVE_NONE:
        dbgmsg(msgs, "Got a BT_FEXT_HAVE_NONE");

        if (fext)
        {
            tr_bitfieldSetHasNone(&msgs->peer.have);
            fireClientGotHaveNone(msgs);
            updatePeerProgress(msgs);
        }
        else
        {
            fireError(msgs, EMSGSIZE);
            return READ_ERR;
        }

        break;

    case BT_FEXT_REJECT:
        {
            struct peer_request r;
            dbgmsg(msgs, "Got a BT_FEXT_REJECT");
            tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
            tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
            tr_peerIoReadUint32(msgs->io, inbuf, &r.length);

            if (fext)
            {
                fireGotRej(msgs, &r);
            }
            else
            {
                fireError(msgs, EMSGSIZE);
                return READ_ERR;
            }

            break;
        }

    case BT_LTEP:
        dbgmsg(msgs, "Got a BT_LTEP");
        parseLtep(msgs, msglen, inbuf);
        break;

    default:
        dbgmsg(msgs, "peer sent us an UNKNOWN: %d", (int)id);
        tr_peerIoDrain(msgs->io, inbuf, msglen);
        break;
    }

    TR_ASSERT(msglen + 1 == msgs->incoming.length);
    TR_ASSERT(evbuffer_get_length(inbuf) == startBufLen - msglen);

    msgs->state = AWAITING_BT_LENGTH;
    return READ_NOW;
}

/* returns 0 on success, or an errno on failure */
static int clientGotBlock(tr_peerMsgs* msgs, struct evbuffer* data, struct peer_request const* req)
{
    TR_ASSERT(msgs != NULL);
    TR_ASSERT(req != NULL);

    int err;
    tr_torrent* tor = msgs->torrent;
    tr_block_index_t const block = _tr_block(tor, req->index, req->offset);

    if (!requestIsValid(msgs, req))
    {
        dbgmsg(msgs, "dropping invalid block %u:%u->%u", req->index, req->offset, req->length);
        return EBADMSG;
    }

    if (req->length != tr_torBlockCountBytes(msgs->torrent, block))
    {
        dbgmsg(msgs, "wrong block size -- expected %u, got %d", tr_torBlockCountBytes(msgs->torrent, block), req->length);
        return EMSGSIZE;
    }

    dbgmsg(msgs, "got block %u:%u->%u", req->index, req->offset, req->length);

    if (!tr_peerMgrDidPeerRequest(msgs->torrent, &msgs->peer, block))
    {
        dbgmsg(msgs, "we didn't ask for this message...");
        return 0;
    }

    if (tr_torrentPieceIsComplete(msgs->torrent, req->index))
    {
        dbgmsg(msgs, "we did ask for this message, but the piece is already complete...");
        return 0;
    }

    /**
    ***  Save the block
    **/

    if ((err = tr_cacheWriteBlock(getSession(msgs)->cache, tor, req->index, req->offset, req->length, data)) != 0)
    {
        return err;
    }

    tr_bitfieldAdd(&msgs->peer.blame, req->index);
    fireGotBlock(msgs, req);
    return 0;
}

static int peerPulse(void* vmsgs);

static void didWrite(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void* vmsgs)
{
    tr_peerMsgs* msgs = vmsgs;

    if (wasPieceData)
    {
        firePeerGotPieceData(msgs, bytesWritten);
    }

    if (tr_isPeerIo(io) && io->userData != NULL)
    {
        peerPulse(msgs);
    }
}

static ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece)
{
    ReadState ret;
    tr_peerMsgs* msgs = vmsgs;
    struct evbuffer* in = tr_peerIoGetReadBuffer(io);
    size_t const inlen = evbuffer_get_length(in);

    dbgmsg(msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state);

    if (inlen == 0)
    {
        ret = READ_LATER;
    }
    else if (msgs->state == AWAITING_BT_PIECE)
    {
        ret = readBtPiece(msgs, in, inlen, piece);
    }
    else
    {
        switch (msgs->state)
        {
        case AWAITING_BT_LENGTH:
            ret = readBtLength(msgs, in, inlen);
            break;

        case AWAITING_BT_ID:
            ret = readBtId(msgs, in, inlen);
            break;

        case AWAITING_BT_MESSAGE:
            ret = readBtMessage(msgs, in, inlen);
            break;

        default:
            ret = READ_ERR;
            TR_ASSERT_MSG(false, "unhandled peer messages state %d", (int)msgs->state);
        }
    }

    dbgmsg(msgs, "canRead: ret is %d", (int)ret);

    return ret;
}

int tr_peerMsgsIsReadingBlock(tr_peerMsgs const* msgs, tr_block_index_t block)
{
    if (msgs->state != AWAITING_BT_PIECE)
    {
        return false;
    }

    return block == _tr_block(msgs->torrent, msgs->incoming.blockReq.index, msgs->incoming.blockReq.offset);
}

/**
***
**/

static void updateDesiredRequestCount(tr_peerMsgs* msgs)
{
    tr_torrent* const torrent = msgs->torrent;

    /* there are lots of reasons we might not want to request any blocks... */
    if (tr_torrentIsSeed(torrent) || !tr_torrentHasMetadata(torrent) || msgs->client_is_choked || !msgs->client_is_interested)
    {
        msgs->desiredRequestCount = 0;
    }
    else
    {
        int estimatedBlocksInPeriod;
        unsigned int rate_Bps;
        unsigned int irate_Bps;
        int const floor = 4;
        int const seconds = REQUEST_BUF_SECS;
        uint64_t const now = tr_time_msec();

        /* Get the rate limit we should use.
         * FIXME: this needs to consider all the other peers as well... */
        rate_Bps = tr_peerGetPieceSpeed_Bps(&msgs->peer, now, TR_PEER_TO_CLIENT);

        if (tr_torrentUsesSpeedLimit(torrent, TR_PEER_TO_CLIENT))
        {
            rate_Bps = MIN(rate_Bps, tr_torrentGetSpeedLimit_Bps(torrent, TR_PEER_TO_CLIENT));
        }

        /* honor the session limits, if enabled */
        if (tr_torrentUsesSessionLimits(torrent) &&
            tr_sessionGetActiveSpeedLimit_Bps(torrent->session, TR_PEER_TO_CLIENT, &irate_Bps))
        {
            rate_Bps = MIN(rate_Bps, irate_Bps);
        }

        /* use this desired rate to figure out how
         * many requests we should send to this peer */
        estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize;
        msgs->desiredRequestCount = MAX(floor, estimatedBlocksInPeriod);

        /* honor the peer's maximum request count, if specified */
        if (msgs->reqq > 0)
        {
            if (msgs->desiredRequestCount > msgs->reqq)
            {
                msgs->desiredRequestCount = msgs->reqq;
            }
        }
    }
}

static void updateMetadataRequests(tr_peerMsgs* msgs, time_t now)
{
    int piece;

    if (msgs->peerSupportsMetadataXfer && tr_torrentGetNextMetadataRequest(msgs->torrent, now, &piece))
    {
        tr_variant tmp;
        struct evbuffer* payload;
        struct evbuffer* out = msgs->outMessages;

        /* build the data message */
        tr_variantInitDict(&tmp, 3);
        tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REQUEST);
        tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
        payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);

        dbgmsg(msgs, "requesting metadata piece #%d", piece);

        /* write it out as a LTEP message to our outMessages buffer */
        evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
        evbuffer_add_uint8(out, BT_LTEP);
        evbuffer_add_uint8(out, msgs->ut_metadata_id);
        evbuffer_add_buffer(out, payload);
        pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
        dbgOutMessageLen(msgs);

        /* cleanup */
        evbuffer_free(payload);
        tr_variantFree(&tmp);
    }
}

static void updateBlockRequests(tr_peerMsgs* msgs)
{
    if (tr_torrentIsPieceTransferAllowed(msgs->torrent, TR_PEER_TO_CLIENT) && msgs->desiredRequestCount > 0 &&
        msgs->peer.pendingReqsToPeer <= msgs->desiredRequestCount * 0.66)
    {
        TR_ASSERT(tr_peerMsgsIsClientInterested(msgs));
        TR_ASSERT(!tr_peerMsgsIsClientChoked(msgs));

        int n;
        tr_block_index_t* blocks;
        int const numwant = msgs->desiredRequestCount - msgs->peer.pendingReqsToPeer;

        blocks = tr_new(tr_block_index_t, numwant);
        tr_peerMgrGetNextRequests(msgs->torrent, &msgs->peer, numwant, blocks, &n, false);

        for (int i = 0; i < n; ++i)
        {
            struct peer_request req;
            blockToReq(msgs->torrent, blocks[i], &req);
            protocolSendRequest(msgs, &req);
        }

        tr_free(blocks);
    }
}

static size_t fillOutputBuffer(tr_peerMsgs* msgs, time_t now)
{
    int piece;
    size_t bytesWritten = 0;
    struct peer_request req;
    bool const haveMessages = evbuffer_get_length(msgs->outMessages) != 0;
    bool const fext = tr_peerIoSupportsFEXT(msgs->io);

    /**
    ***  Protocol messages
    **/

    if (haveMessages && msgs->outMessagesBatchedAt == 0) /* fresh batch */
    {
        dbgmsg(msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length(msgs->outMessages));
        msgs->outMessagesBatchedAt = now;
    }
    else if (haveMessages && now - msgs->outMessagesBatchedAt >= msgs->outMessagesBatchPeriod)
    {
        size_t const len = evbuffer_get_length(msgs->outMessages);
        /* flush the protocol messages */
        dbgmsg(msgs, "flushing outMessages... to %p (length is %zu)", (void*)msgs->io, len);
        tr_peerIoWriteBuf(msgs->io, msgs->outMessages, false);
        msgs->clientSentAnythingAt = now;
        msgs->outMessagesBatchedAt = 0;
        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
        bytesWritten += len;
    }

    /**
    ***  Metadata Pieces
    **/

    if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= METADATA_PIECE_SIZE && popNextMetadataRequest(msgs, &piece))
    {
        char* data;
        size_t dataLen;
        bool ok = false;

        data = tr_torrentGetMetadataPiece(msgs->torrent, piece, &dataLen);

        if (data != NULL)
        {
            tr_variant tmp;
            struct evbuffer* payload;
            struct evbuffer* out = msgs->outMessages;

            /* build the data message */
            tr_variantInitDict(&tmp, 3);
            tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_DATA);
            tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
            tr_variantDictAddInt(&tmp, TR_KEY_total_size, msgs->torrent->infoDictLength);
            payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);

            /* write it out as a LTEP message to our outMessages buffer */
            evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload) + dataLen);
            evbuffer_add_uint8(out, BT_LTEP);
            evbuffer_add_uint8(out, msgs->ut_metadata_id);
            evbuffer_add_buffer(out, payload);
            evbuffer_add(out, data, dataLen);
            pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
            dbgOutMessageLen(msgs);

            evbuffer_free(payload);
            tr_variantFree(&tmp);
            tr_free(data);

            ok = true;
        }

        if (!ok) /* send a rejection message */
        {
            tr_variant tmp;
            struct evbuffer* payload;
            struct evbuffer* out = msgs->outMessages;

            /* build the rejection message */
            tr_variantInitDict(&tmp, 2);
            tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
            tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
            payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);

            /* write it out as a LTEP message to our outMessages buffer */
            evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
            evbuffer_add_uint8(out, BT_LTEP);
            evbuffer_add_uint8(out, msgs->ut_metadata_id);
            evbuffer_add_buffer(out, payload);
            pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
            dbgOutMessageLen(msgs);

            evbuffer_free(payload);
            tr_variantFree(&tmp);
        }
    }

    /**
    ***  Data Blocks
    **/

    if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= msgs->torrent->blockSize && popNextRequest(msgs, &req))
    {
        --msgs->prefetchCount;

        if (requestIsValid(msgs, &req) && tr_torrentPieceIsComplete(msgs->torrent, req.index))
        {
            int err;
            uint32_t const msglen = 4 + 1 + 4 + 4 + req.length;
            struct evbuffer* out;
            struct evbuffer_iovec iovec[1];

            out = evbuffer_new();
            evbuffer_expand(out, msglen);

            evbuffer_add_uint32(out, sizeof(uint8_t) + 2 * sizeof(uint32_t) + req.length);
            evbuffer_add_uint8(out, BT_PIECE);
            evbuffer_add_uint32(out, req.index);
            evbuffer_add_uint32(out, req.offset);

            evbuffer_reserve_space(out, req.length, iovec, 1);
            err = tr_cacheReadBlock(getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length,
                iovec[0].iov_base);
            iovec[0].iov_len = req.length;
            evbuffer_commit_space(out, iovec, 1);

            /* check the piece if it needs checking... */
            if (err == 0 && tr_torrentPieceNeedsCheck(msgs->torrent, req.index))
            {
                if ((err = !tr_torrentCheckPiece(msgs->torrent, req.index)) != 0)
                {
                    tr_torrentSetLocalError(msgs->torrent, _("Please Verify Local Data! Piece #%zu is corrupt."),
                        (size_t)req.index);
                }
            }

            if (err != 0)
            {
                if (fext)
                {
                    protocolSendReject(msgs, &req);
                }
            }
            else
            {
                size_t const n = evbuffer_get_length(out);
                dbgmsg(msgs, "sending block %u:%u->%u", req.index, req.offset, req.length);
                TR_ASSERT(n == msglen);
                tr_peerIoWriteBuf(msgs->io, out, true);
                bytesWritten += n;
                msgs->clientSentAnythingAt = now;
                tr_historyAdd(&msgs->peer.blocksSentToPeer, tr_time(), 1);
            }

            evbuffer_free(out);

            if (err != 0)
            {
                bytesWritten = 0;
                msgs = NULL;
            }
        }
        else if (fext) /* peer needs a reject message */
        {
            protocolSendReject(msgs, &req);
        }

        if (msgs != NULL)
        {
            prefetchPieces(msgs);
        }
    }

    /**
    ***  Keepalive
    **/

    if (msgs != NULL && msgs->clientSentAnythingAt != 0 && now - msgs->clientSentAnythingAt > KEEPALIVE_INTERVAL_SECS)
    {
        dbgmsg(msgs, "sending a keepalive message");
        evbuffer_add_uint32(msgs->outMessages, 0);
        pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
    }

    return bytesWritten;
}

static int peerPulse(void* vmsgs)
{
    tr_peerMsgs* msgs = vmsgs;
    time_t const now = tr_time();

    if (tr_isPeerIo(msgs->io))
    {
        updateDesiredRequestCount(msgs);
        updateBlockRequests(msgs);
        updateMetadataRequests(msgs, now);
    }

    for (;;)
    {
        if (fillOutputBuffer(msgs, now) < 1)
        {
            break;
        }
    }

    return true; /* loop forever */
}

void tr_peerMsgsPulse(tr_peerMsgs* msgs)
{
    if (msgs != NULL)
    {
        peerPulse(msgs);
    }
}

static void gotError(tr_peerIo* io UNUSED, short what, void* vmsgs)
{
    if ((what & BEV_EVENT_TIMEOUT) != 0)
    {
        dbgmsg(vmsgs, "libevent got a timeout, what=%hd", what);
    }

    if ((what & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) != 0)
    {
        dbgmsg(vmsgs, "libevent got an error! what=%hd, errno=%d (%s)", what, errno, tr_strerror(errno));
    }

    fireError(vmsgs, ENOTCONN);
}

static void sendBitfield(tr_peerMsgs* msgs)
{
    TR_ASSERT(tr_torrentHasMetadata(msgs->torrent));

    void* bytes;
    size_t byte_count = 0;
    struct evbuffer* out = msgs->outMessages;

    bytes = tr_torrentCreatePieceBitfield(msgs->torrent, &byte_count);
    evbuffer_add_uint32(out, sizeof(uint8_t) + byte_count);
    evbuffer_add_uint8(out, BT_BITFIELD);
    evbuffer_add(out, bytes, byte_count);
    dbgmsg(msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length(out));
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);

    tr_free(bytes);
}

static void tellPeerWhatWeHave(tr_peerMsgs* msgs)
{
    bool const fext = tr_peerIoSupportsFEXT(msgs->io);

    if (fext && tr_torrentHasAll(msgs->torrent))
    {
        protocolSendHaveAll(msgs);
    }
    else if (fext && tr_torrentHasNone(msgs->torrent))
    {
        protocolSendHaveNone(msgs);
    }
    else if (!tr_torrentHasNone(msgs->torrent))
    {
        sendBitfield(msgs);
    }
}

/**
***
**/

/* some peers give us error messages if we send
   more than this many peers in a single pex message
   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
#define MAX_PEX_ADDED 50
#define MAX_PEX_DROPPED 50

typedef struct
{
    tr_pex* added;
    tr_pex* dropped;
    tr_pex* elements;
    int addedCount;
    int droppedCount;
    int elementCount;
}
PexDiffs;

static void pexAddedCb(void* vpex, void* userData)
{
    PexDiffs* diffs = userData;
    tr_pex* pex = vpex;

    if (diffs->addedCount < MAX_PEX_ADDED)
    {
        diffs->added[diffs->addedCount++] = *pex;
        diffs->elements[diffs->elementCount++] = *pex;
    }
}

static inline void pexDroppedCb(void* vpex, void* userData)
{
    PexDiffs* diffs = userData;
    tr_pex* pex = vpex;

    if (diffs->droppedCount < MAX_PEX_DROPPED)
    {
        diffs->dropped[diffs->droppedCount++] = *pex;
    }
}

static inline void pexElementCb(void* vpex, void* userData)
{
    PexDiffs* diffs = userData;
    tr_pex* pex = vpex;

    diffs->elements[diffs->elementCount++] = *pex;
}

typedef void (* tr_set_func)(void* element, void* userData);

/**
 * @brief find the differences and commonalities in two sorted sets
 * @param a the first set
 * @param aCount the number of elements in the set 'a'
 * @param b the second set
 * @param bCount the number of elements in the set 'b'
 * @param compare the sorting method for both sets
 * @param elementSize the sizeof the element in the two sorted sets
 * @param in_a called for items in set 'a' but not set 'b'
 * @param in_b called for items in set 'b' but not set 'a'
 * @param in_both called for items that are in both sets
 * @param userData user data passed along to in_a, in_b, and in_both
 */
static void tr_set_compare(void const* va, size_t aCount, void const* vb, size_t bCount, tr_voidptr_compare_func compare,
    size_t elementSize, tr_set_func in_a_cb, tr_set_func in_b_cb, tr_set_func in_both_cb, void* userData)
{
    uint8_t const* a = va;
    uint8_t const* b = vb;
    uint8_t const* aend = a + elementSize * aCount;
    uint8_t const* bend = b + elementSize * bCount;

    while (a != aend || b != bend)
    {
        if (a == aend)
        {
            (*in_b_cb)((void*)b, userData);
            b += elementSize;
        }
        else if (b == bend)
        {
            (*in_a_cb)((void*)a, userData);
            a += elementSize;
        }
        else
        {
            int const val = (*compare)(a, b);

            if (val == 0)
            {
                (*in_both_cb)((void*)a, userData);
                a += elementSize;
                b += elementSize;
            }
            else if (val < 0)
            {
                (*in_a_cb)((void*)a, userData);
                a += elementSize;
            }
            else if (val > 0)
            {
                (*in_b_cb)((void*)b, userData);
                b += elementSize;
            }
        }
    }
}

static void sendPex(tr_peerMsgs* msgs)
{
    if (msgs->peerSupportsPex && tr_torrentAllowsPex(msgs->torrent))
    {
        PexDiffs diffs;
        PexDiffs diffs6;
        tr_pex* newPex = NULL;
        tr_pex* newPex6 = NULL;
        int const newCount = tr_peerMgrGetPeers(msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
        int const newCount6 = tr_peerMgrGetPeers(msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);

        /* build the diffs */
        diffs.added = tr_new(tr_pex, newCount);
        diffs.addedCount = 0;
        diffs.dropped = tr_new(tr_pex, msgs->pexCount);
        diffs.droppedCount = 0;
        diffs.elements = tr_new(tr_pex, newCount + msgs->pexCount);
        diffs.elementCount = 0;
        tr_set_compare(msgs->pex, msgs->pexCount, newPex, newCount, tr_pexCompare, sizeof(tr_pex), pexDroppedCb, pexAddedCb,
            pexElementCb, &diffs);
        diffs6.added = tr_new(tr_pex, newCount6);
        diffs6.addedCount = 0;
        diffs6.dropped = tr_new(tr_pex, msgs->pexCount6);
        diffs6.droppedCount = 0;
        diffs6.elements = tr_new(tr_pex, newCount6 + msgs->pexCount6);
        diffs6.elementCount = 0;
        tr_set_compare(msgs->pex6, msgs->pexCount6, newPex6, newCount6, tr_pexCompare, sizeof(tr_pex), pexDroppedCb, pexAddedCb,
            pexElementCb, &diffs6);
        dbgmsg(msgs, "pex: old peer count %d+%d, new peer count %d+%d, added %d+%d, removed %d+%d", msgs->pexCount,
            msgs->pexCount6, newCount, newCount6, diffs.addedCount, diffs6.addedCount, diffs.droppedCount, diffs6.droppedCount);

        if (diffs.addedCount == 0 && diffs.droppedCount == 0 && diffs6.addedCount == 0 && diffs6.droppedCount == 0)
        {
            tr_free(diffs.elements);
            tr_free(diffs6.elements);
        }
        else
        {
            tr_variant val;
            uint8_t* tmp;
            uint8_t* walk;
            struct evbuffer* payload;
            struct evbuffer* out = msgs->outMessages;

            /* update peer */
            tr_free(msgs->pex);
            msgs->pex = diffs.elements;
            msgs->pexCount = diffs.elementCount;
            tr_free(msgs->pex6);
            msgs->pex6 = diffs6.elements;
            msgs->pexCount6 = diffs6.elementCount;

            /* build the pex payload */
            tr_variantInitDict(&val, 3); /* ipv6 support: left as 3: speed vs. likelihood? */

            if (diffs.addedCount > 0)
            {
                /* "added" */
                tmp = walk = tr_new(uint8_t, diffs.addedCount * 6);

                for (int i = 0; i < diffs.addedCount; ++i)
                {
                    memcpy(walk, &diffs.added[i].addr.addr, 4);
                    walk += 4;
                    memcpy(walk, &diffs.added[i].port, 2);
                    walk += 2;
                }

                TR_ASSERT(walk - tmp == diffs.addedCount * 6);
                tr_variantDictAddRaw(&val, TR_KEY_added, tmp, walk - tmp);
                tr_free(tmp);

                /* "added.f"
                 * unset each holepunch flag because we don't support it. */
                tmp = walk = tr_new(uint8_t, diffs.addedCount);

                for (int i = 0; i < diffs.addedCount; ++i)
                {
                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
                }

                TR_ASSERT(walk - tmp == diffs.addedCount);
                tr_variantDictAddRaw(&val, TR_KEY_added_f, tmp, walk - tmp);
                tr_free(tmp);
            }

            if (diffs.droppedCount > 0)
            {
                /* "dropped" */
                tmp = walk = tr_new(uint8_t, diffs.droppedCount * 6);

                for (int i = 0; i < diffs.droppedCount; ++i)
                {
                    memcpy(walk, &diffs.dropped[i].addr.addr, 4);
                    walk += 4;
                    memcpy(walk, &diffs.dropped[i].port, 2);
                    walk += 2;
                }

                TR_ASSERT(walk - tmp == diffs.droppedCount * 6);
                tr_variantDictAddRaw(&val, TR_KEY_dropped, tmp, walk - tmp);
                tr_free(tmp);
            }

            if (diffs6.addedCount > 0)
            {
                /* "added6" */
                tmp = walk = tr_new(uint8_t, diffs6.addedCount * 18);

                for (int i = 0; i < diffs6.addedCount; ++i)
                {
                    memcpy(walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16);
                    walk += 16;
                    memcpy(walk, &diffs6.added[i].port, 2);
                    walk += 2;
                }

                TR_ASSERT(walk - tmp == diffs6.addedCount * 18);
                tr_variantDictAddRaw(&val, TR_KEY_added6, tmp, walk - tmp);
                tr_free(tmp);

                /* "added6.f"
                 * unset each holepunch flag because we don't support it. */
                tmp = walk = tr_new(uint8_t, diffs6.addedCount);

                for (int i = 0; i < diffs6.addedCount; ++i)
                {
                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
                }

                TR_ASSERT(walk - tmp == diffs6.addedCount);
                tr_variantDictAddRaw(&val, TR_KEY_added6_f, tmp, walk - tmp);
                tr_free(tmp);
            }

            if (diffs6.droppedCount > 0)
            {
                /* "dropped6" */
                tmp = walk = tr_new(uint8_t, diffs6.droppedCount * 18);

                for (int i = 0; i < diffs6.droppedCount; ++i)
                {
                    memcpy(walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16);
                    walk += 16;
                    memcpy(walk, &diffs6.dropped[i].port, 2);
                    walk += 2;
                }

                TR_ASSERT(walk - tmp == diffs6.droppedCount * 18);
                tr_variantDictAddRaw(&val, TR_KEY_dropped6, tmp, walk - tmp);
                tr_free(tmp);
            }

            /* write the pex message */
            payload = tr_variantToBuf(&val, TR_VARIANT_FMT_BENC);
            evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
            evbuffer_add_uint8(out, BT_LTEP);
            evbuffer_add_uint8(out, msgs->ut_pex_id);
            evbuffer_add_buffer(out, payload);
            pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
            dbgmsg(msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length(out));
            dbgOutMessageLen(msgs);

            evbuffer_free(payload);
            tr_variantFree(&val);
        }

        /* cleanup */
        tr_free(diffs.added);
        tr_free(diffs.dropped);
        tr_free(newPex);
        tr_free(diffs6.added);
        tr_free(diffs6.dropped);
        tr_free(newPex6);

        /* msgs->clientSentPexAt = tr_time(); */
    }
}

static void pexPulse(evutil_socket_t foo UNUSED, short bar UNUSED, void* vmsgs)
{
    struct tr_peerMsgs* msgs = vmsgs;

    sendPex(msgs);

    TR_ASSERT(msgs->pexTimer != NULL);
    tr_timerAdd(msgs->pexTimer, PEX_INTERVAL_SECS, 0);
}

/***
****  tr_peer virtual functions
***/

static bool peermsgs_is_transferring_pieces(struct tr_peer const* peer, uint64_t now, tr_direction direction,
    unsigned int* setme_Bps)
{
    unsigned int Bps = 0;

    if (tr_isPeerMsgs(peer))
    {
        tr_peerMsgs const* msgs = (tr_peerMsgs const*)peer;
        Bps = tr_peerIoGetPieceSpeed_Bps(msgs->io, now, direction);
    }

    if (setme_Bps != NULL)
    {
        *setme_Bps = Bps;
    }

    return Bps > 0;
}

static void peermsgs_destruct(tr_peer* peer)
{
    tr_peerMsgs* msgs = PEER_MSGS(peer);

    TR_ASSERT(msgs != NULL);

    tr_peerMsgsSetActive(msgs, TR_UP, false);
    tr_peerMsgsSetActive(msgs, TR_DOWN, false);

    if (msgs->pexTimer != NULL)
    {
        event_free(msgs->pexTimer);
    }

    if (msgs->incoming.block != NULL)
    {
        evbuffer_free(msgs->incoming.block);
    }

    if (msgs->io != NULL)
    {
        tr_peerIoClear(msgs->io);
        tr_peerIoUnref(msgs->io); /* balanced by the ref in handshakeDoneCB() */
    }

    evbuffer_free(msgs->outMessages);
    tr_free(msgs->pex6);
    tr_free(msgs->pex);

    tr_peerDestruct(&msgs->peer);

    memset(msgs, ~0, sizeof(tr_peerMsgs));
}

static struct tr_peer_virtual_funcs const my_funcs =
{
    .destruct = peermsgs_destruct,
    .is_transferring_pieces = peermsgs_is_transferring_pieces
};

/***
****
***/

time_t tr_peerMsgsGetConnectionAge(tr_peerMsgs const* msgs)
{
    TR_ASSERT(tr_isPeerMsgs(msgs));

    return tr_peerIoGetAge(msgs->io);
}

bool tr_peerMsgsIsPeerChoked(tr_peerMsgs const* msgs)
{
    TR_ASSERT(tr_isPeerMsgs(msgs));

    return msgs->peer_is_choked;
}

bool tr_peerMsgsIsPeerInterested(tr_peerMsgs const* msgs)
{
    TR_ASSERT(tr_isPeerMsgs(msgs));

    return msgs->peer_is_interested;
}

bool tr_peerMsgsIsClientChoked(tr_peerMsgs const* msgs)
{
    TR_ASSERT(tr_isPeerMsgs(msgs));

    return msgs->client_is_choked;
}

bool tr_peerMsgsIsClientInterested(tr_peerMsgs const* msgs)
{
    TR_ASSERT(tr_isPeerMsgs(msgs));

    return msgs->client_is_interested;
}

bool tr_peerMsgsIsUtpConnection(tr_peerMsgs const* msgs)
{
    TR_ASSERT(tr_isPeerMsgs(msgs));

    return msgs->io->socket.type == TR_PEER_SOCKET_TYPE_UTP;
}

bool tr_peerMsgsIsEncrypted(tr_peerMsgs const* msgs)
{
    TR_ASSERT(tr_isPeerMsgs(msgs));

    return tr_peerIoIsEncrypted(msgs->io);
}

bool tr_peerMsgsIsIncomingConnection(tr_peerMsgs const* msgs)
{
    TR_ASSERT(tr_isPeerMsgs(msgs));

    return tr_peerIoIsIncoming(msgs->io);
}

/***
****
***/

bool tr_isPeerMsgs(void const* msgs)
{
    /* FIXME: this is pretty crude */
    return msgs != NULL && ((struct tr_peerMsgs*)msgs)->magic_number == MAGIC_NUMBER;
}

tr_peerMsgs* tr_peerMsgsCast(void* vm)
{
    return tr_isPeerMsgs(vm) ? vm : NULL;
}

tr_peerMsgs* tr_peerMsgsNew(struct tr_torrent* torrent, struct tr_peerIo* io, tr_peer_callback callback, void* callbackData)
{
    TR_ASSERT(io != NULL);

    tr_peerMsgs* m = tr_new0(tr_peerMsgs, 1);

    tr_peerConstruct(&m->peer, torrent);
    m->peer.funcs = &my_funcs;

    m->magic_number = MAGIC_NUMBER;
    m->client_is_choked = true;
    m->peer_is_choked = true;
    m->client_is_interested = false;
    m->peer_is_interested = false;
    m->is_active[TR_UP] = false;
    m->is_active[TR_DOWN] = false;
    m->callback = callback;
    m->callbackData = callbackData;
    m->io = io;
    m->torrent = torrent;
    m->state = AWAITING_BT_LENGTH;
    m->outMessages = evbuffer_new();
    m->outMessagesBatchedAt = 0;
    m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;

    if (tr_torrentAllowsPex(torrent))
    {
        m->pexTimer = evtimer_new(torrent->session->event_base, pexPulse, m);
        tr_timerAdd(m->pexTimer, PEX_INTERVAL_SECS, 0);
    }

    if (tr_peerIoSupportsUTP(m->io))
    {
        tr_address const* addr = tr_peerIoGetAddress(m->io, NULL);
        tr_peerMgrSetUtpSupported(torrent, addr);
        tr_peerMgrSetUtpFailed(torrent, addr, false);
    }

    if (tr_peerIoSupportsLTEP(m->io))
    {
        sendLtepHandshake(m);
    }

    tellPeerWhatWeHave(m);

    if (tr_dhtEnabled(torrent->session) && tr_peerIoSupportsDHT(m->io))
    {
        /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
        struct tr_address const* addr = tr_peerIoGetAddress(m->io, NULL);

        if (addr->type == TR_AF_INET || tr_globalIPv6())
        {
            protocolSendPort(m, tr_dhtPort(torrent->session));
        }
    }

    tr_peerIoSetIOFuncs(m->io, canRead, didWrite, gotError, m);
    updateDesiredRequestCount(m);

    return m;
}
